package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.atguigu.bean.TrafficHomeDetailPageViewBean;
import com.atguigu.common.Constant;
import com.atguigu.util.DateFormatUtil;
import com.atguigu.util.DorisUtil;
import com.atguigu.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

//数据流:web/app -> nginx -> 日志服务器(log文件) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> Doris
//程  序:Mock -> Flume(f1.sh) -> Kafka(ZK) -> Dwd01_TrafficBaseLogSplit -> Kafka(ZK) -> Dws03_TrafficHomeDetailPageViewWindow -> Doris(DWS)
public class Dws03_TrafficHomeDetailPageViewWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);  //生产环境中,主题并行度与Kafka主题的分区数保持一致

        //Logger logger = LoggerFactory.getLogger(Dwd01_TrafficBaseLogSplit.class);
        //logger.info("aaa");

        //1.1 开启CheckPoint
        env.enableCheckpointing(5000L);
        env.setStateBackend(new HashMapStateBackend());

        //1.2 CheckPoint相关设置
        //CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        //checkpointConfig.setCheckpointTimeout(10000L);
        //checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
        //Cancel任务时保存最后一次CheckPoint结果
        //checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
        //设置重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));

        //TODO 2.读取Kafka页面主题数据创建流
        DataStreamSource<String> kafkaDS = env.fromSource(KafkaUtil.getKafkaSource(Constant.TOPIC_DWD_TRAFFIC_PAGE, "home_detail_page_view_230524"), WatermarkStrategy.noWatermarks(), "kafka-source");

        //TODO 3.过滤并转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSONObject.parseObject(value);
                    String pageId = jsonObject.getJSONObject("page").getString("page_id");
                    if ("home".equals(pageId) || "good_detail".equals(pageId)) {
                        out.collect(jsonObject);
                    }
                } catch (JSONException e) {
                    System.out.println("脏数据：" + value);
                }
            }
        });

        //TODO 4.提取时间戳生成WaterMark
        SingleOutputStreamOperator<JSONObject> jsonObjWithWMDS = jsonObjDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
            @Override
            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                return element.getLong("ts");
            }
        }));

        //TODO 5.按照Mid分组,计算独立访客数 同时转换为JavaBean对象
        KeyedStream<JSONObject, String> keyedStream = jsonObjWithWMDS.keyBy(json -> json.getJSONObject("common").getString("mid"));
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> trafficHomeDetailPageViewDS = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, TrafficHomeDetailPageViewBean>() {

            private ValueState<String> homeLastVisitDtState;
            private ValueState<String> detailLastVisitDtState;

            @Override
            public void open(Configuration parameters) throws Exception {

                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                ValueStateDescriptor<String> homeStateDescriptor = new ValueStateDescriptor<>("last-home-dt", String.class);
                homeStateDescriptor.enableTimeToLive(ttlConfig);
                ValueStateDescriptor<String> detailStateDescriptor = new ValueStateDescriptor<>("last-detail-dt", String.class);
                detailStateDescriptor.enableTimeToLive(ttlConfig);

                homeLastVisitDtState = getRuntimeContext().getState(homeStateDescriptor);
                detailLastVisitDtState = getRuntimeContext().getState(detailStateDescriptor);
            }

            @Override
            public void flatMap(JSONObject value, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {

                //获取相关数据
                String pageId = value.getJSONObject("page").getString("page_id");
                String curDt = DateFormatUtil.toDate(value.getLong("ts"));

                long homeUv = 0L;
                long detailUv = 0L;

                if ("home".equals(pageId)) {
                    String homeLastDt = homeLastVisitDtState.value();
                    if (homeLastDt == null || !homeLastDt.equals(curDt)) {
                        homeUv = 1L;
                        homeLastVisitDtState.update(curDt);
                    }
                } else {
                    String detailLastDt = detailLastVisitDtState.value();
                    if (detailLastDt == null || !detailLastDt.equals(curDt)) {
                        detailUv = 1L;
                        detailLastVisitDtState.update(curDt);
                    }
                }

                //如果两个UV都为0,则过滤掉,不输出
                if (homeUv == 1L || detailUv == 1) {
                    out.collect(new TrafficHomeDetailPageViewBean("", "", curDt, homeUv, detailUv));
                }
            }
        });

        //TODO 6.开窗聚合
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> resultDS = trafficHomeDetailPageViewDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TrafficHomeDetailPageViewBean>() {
                    @Override
                    public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
                        value1.setHomeUvCt(value1.getHomeUvCt() + value2.getHomeUvCt());
                        value1.setGoodDetailUvCt(value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt());
                        return value1;
                    }
                }, new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
                        TrafficHomeDetailPageViewBean detailPageViewBean = values.iterator().next();
                        detailPageViewBean.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        detailPageViewBean.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        out.collect(detailPageViewBean);
                    }
                });

        //TODO 7.将数据写出
        resultDS.print(">>>>>>");
        resultDS.map(bean -> {
                    SerializeConfig config = new SerializeConfig();
                    config.propertyNamingStrategy = PropertyNamingStrategy.SnakeCase;  // 转成json的时候, 属性名使用下划线
                    return JSON.toJSONString(bean, config);
                })
                .sinkTo(DorisUtil.getDorisSink("gmall_230524.dws_traffic_home_detail_page_view_window"));

        //TODO 8.启动任务
        env.execute("Dws03_TrafficHomeDetailPageViewWindow");

    }
}
